home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- import os
- import re
- import stat
- from stat import *
- import sys
- import ufw.util as ufw
- from ufw.util import warn, debug
- from ufw.common import UFWError, config_dir, UFWRule
- import ufw.applications as ufw
-
- class UFWBackend:
- '''Interface for backends'''
-
- def __init__(self, name, d, extra_files = { }):
- self.defaults = { }
- self.name = name
- self.dryrun = d
- self.rules = []
- self.rules6 = []
- self.files = {
- 'defaults': os.path.join(config_dir, 'default/ufw'),
- 'conf': os.path.join(config_dir, 'ufw/ufw.conf'),
- 'apps': os.path.join(config_dir, 'ufw/applications.d') }
- self.files.update(extra_files)
- self.loglevels = {
- 'off': 0,
- 'low': 100,
- 'medium': 200,
- 'high': 300,
- 'full': 400 }
- self.do_checks = True
-
- try:
- self._do_checks()
- self._get_defaults()
- self._read_rules()
- except Exception:
- raise
-
- self.profiles = ufw.applications.get_profiles(self.files['apps'])
- self.iptables_version = ufw.util.get_iptables_version()
-
-
- def _is_enabled(self):
- if self.defaults.has_key('enabled') and self.defaults['enabled'] == 'yes':
- return True
- return False
-
-
- def use_ipv6(self):
- if self.defaults.has_key('ipv6') and self.defaults['ipv6'] == 'yes' and os.path.exists('/proc/sys/net/ipv6'):
- return True
- return False
-
-
- def _do_checks(self):
- """Perform basic security checks:
- is setuid or setgid (for non-Linux systems)
- checks that script is owned by root
- checks that every component in absolute path are owned by root
- warn if script is group writable
- warn if part of script path is group writable
-
- Doing this at the beginning causes a race condition with later
- operations that don't do these checks. However, if the user running
- this script is root, then need to be root to exploit the race
- condition (and you are hosed anyway...)
- """
- if not self.do_checks:
- err_msg = _('Checks disabled')
- warn(err_msg)
- return True
- if os.getuid() != os.geteuid():
- err_msg = _('ERROR: this script should not be SUID')
- raise UFWError(err_msg)
- os.getuid() != os.geteuid()
- if os.getgid() != os.getegid():
- err_msg = _('ERROR: this script should not be SGID')
- raise UFWError(err_msg)
- os.getgid() != os.getegid()
- uid = os.getuid()
- if uid != 0:
- err_msg = _('You need to be root to run this script')
- raise UFWError(err_msg)
- uid != 0
- warned_world_write = { }
- warned_group_write = { }
- warned_owner = { }
- profiles = []
- if not os.path.isdir(self.files['apps']):
- warn_msg = _("'%s' does not exist") % self.files['apps']
- warn(warn_msg)
- else:
- pat = re.compile('^\\.')
- for profile in os.listdir(self.files['apps']):
- if not pat.search(profile):
- profiles.append(os.path.join(self.files['apps'], profile))
- continue
-
- for path in self.files.values() + [
- os.path.abspath(sys.argv[0])] + profiles:
- while True:
- debug('Checking ' + path)
- if path == self.files['apps'] and not os.path.isdir(self.files['apps']):
- break
-
-
- try:
- statinfo = os.stat(path)
- mode = statinfo[ST_MODE]
- except OSError:
- e = None
- err_msg = _("Couldn't stat '%s'") % path
- raise UFWError(err_msg)
- except Exception:
- raise
-
- if statinfo.st_uid != 0 and not warned_owner.has_key(path):
- warn_msg = _("uid is %s but '%s' is owned by %s") % (str(uid), path, str(statinfo.st_uid))
- warn(warn_msg)
- warned_owner[path] = True
-
- if mode & S_IWOTH and not warned_world_write.has_key(path):
- warn_msg = _('%s is world writable!') % path
- warn(warn_msg)
- warned_world_write[path] = True
-
- if mode & S_IWGRP and not warned_group_write.has_key(path):
- warn_msg = _('%s is group writable!') % path
- warn(warn_msg)
- warned_group_write[path] = True
-
- if path == '/':
- break
-
- path = os.path.dirname(path)
- if not path:
- raise
- path
-
- for f in self.files:
- if f != 'apps' and not os.path.isfile(self.files[f]):
- err_msg = _("'%s' file '%s' does not exist") % (f, self.files[f])
- raise UFWError(err_msg)
- not os.path.isfile(self.files[f])
-
-
-
- def _get_defaults(self):
- '''Get all settings from defaults file'''
- self.defaults = { }
- for f in [
- self.files['defaults'],
- self.files['conf']]:
-
- try:
- orig = ufw.util.open_file_read(f)
- except Exception:
- err_msg = _("Couldn't open '%s' for reading") % f
- raise UFWError(err_msg)
-
- pat = re.compile('^\\w+="?\\w+"?')
- for line in orig:
- if pat.search(line):
- tmp = re.split('=', line.strip())
- self.defaults[tmp[0].lower()] = tmp[1].lower().strip('"\'')
- continue
-
- orig.close()
-
-
-
- def set_default(self, f, opt, value):
- '''Sets option in defaults file'''
- if not re.match('^[\\w_]+$', opt):
- err_msg = _('Invalid option')
- raise UFWError(err_msg)
- re.match('^[\\w_]+$', opt)
-
- try:
- fns = ufw.util.open_files(f)
- except Exception:
- raise
-
- fd = fns['tmp']
- found = False
- pat = re.compile('^' + opt + '=')
- for line in fns['orig']:
- if pat.search(line):
- os.write(fd, opt + '=' + value + '\n')
- found = True
- continue
- os.write(fd, line)
-
- if not found:
- os.write(fd, opt + '=' + value + '\n')
-
- ufw.util.close_files(fns)
- self.defaults[opt.lower()] = value.lower().strip('"\'')
-
-
- def set_default_application_policy(self, policy):
- '''Sets default application policy of firewall'''
- if not self.dryrun:
- if policy == 'allow':
- self.set_default(self.files['defaults'], 'DEFAULT_APPLICATION_POLICY', '"ACCEPT"')
- elif policy == 'deny':
- self.set_default(self.files['defaults'], 'DEFAULT_APPLICATION_POLICY', '"DROP"')
- elif policy == 'reject':
- self.set_default(self.files['defaults'], 'DEFAULT_APPLICATION_POLICY', '"REJECT"')
- elif policy == 'skip':
- self.set_default(self.files['defaults'], 'DEFAULT_APPLICATION_POLICY', '"SKIP"')
- else:
- err_msg = _("Unsupported policy '%s'") % policy
- raise UFWError(err_msg)
- policy == 'allow'
- rstr = _("Default application policy changed to '%s'") % policy
- return rstr
-
-
- def get_app_rules_from_template(self, template):
- '''Return a list of UFWRules based on the template rule'''
- rules = []
- profile_names = self.profiles.keys()
- if template.dport in profile_names and template.sport in profile_names:
- dports = ufw.applications.get_ports(self.profiles[template.dport])
- sports = ufw.applications.get_ports(self.profiles[template.sport])
- for i in dports:
- tmp = template.dup_rule()
- tmp.dapp = ''
- tmp.set_port('any', 'src')
-
- try:
- (port, proto) = ufw.util.parse_port_proto(i)
- tmp.set_protocol(proto)
- tmp.set_port(port, 'dst')
- except Exception:
- raise
-
- tmp.dapp = template.dapp
- if template.dport == template.sport:
- tmp.sapp = ''
-
- try:
- (port, proto) = ufw.util.parse_port_proto(i)
- tmp.set_protocol(proto)
- tmp.set_port(port, 'src')
- except Exception:
- raise
-
- tmp.sapp = template.sapp
- rules.append(tmp)
- continue
- for j in sports:
- rule = tmp.dup_rule()
- rule.sapp = ''
-
- try:
- (port, proto) = ufw.util.parse_port_proto(j)
- rule.set_protocol(proto)
- rule.set_port(port, 'src')
- except Exception:
- raise
-
- if rule.protocol == 'any':
- rule.set_protocol(tmp.protocol)
-
- rule.sapp = template.sapp
- rules.append(rule)
-
-
- elif template.sport in profile_names:
- for p in ufw.applications.get_ports(self.profiles[template.sport]):
- rule = template.dup_rule()
- rule.sapp = ''
-
- try:
- (port, proto) = ufw.util.parse_port_proto(p)
- rule.set_protocol(proto)
- rule.set_port(port, 'src')
- except Exception:
- raise
-
- rule.sapp = template.sapp
- rules.append(rule)
-
- elif template.dport in profile_names:
- for p in ufw.applications.get_ports(self.profiles[template.dport]):
- rule = template.dup_rule()
- rule.dapp = ''
-
- try:
- (port, proto) = ufw.util.parse_port_proto(p)
- rule.set_protocol(proto)
- rule.set_port(port, 'dst')
- except Exception:
- raise
-
- rule.dapp = template.dapp
- rules.append(rule)
-
-
- if len(rules) < 1:
- err_msg = _('No rules found for application profile')
- raise UFWError(err_msg)
- len(rules) < 1
- return rules
-
-
- def update_app_rule(self, profile):
- '''Update rule for profile in place. Returns result string and bool
- on whether or not the profile is used in the current ruleset.
- '''
- updated_rules = []
- updated_rules6 = []
- last_tuple = ''
- rstr = ''
- updated_profile = False
- for r in self.rules + self.rules6:
- if r.dapp == profile or r.sapp == profile:
- tuple = r.get_app_tuple()
- if tuple == last_tuple:
- continue
- else:
- template = r.dup_rule()
- template.set_protocol('any')
- if template.dapp != '':
- template.set_port(template.dapp, 'dst')
-
- if template.sapp != '':
- template.set_port(template.sapp, 'src')
-
-
- try:
- new_app_rules = self.get_app_rules_from_template(template)
- except Exception:
- raise
-
- for new_r in new_app_rules:
- new_r.normalize()
- if new_r.v6:
- updated_rules6.append(new_r)
- continue
- updated_rules.append(new_r)
-
- last_tuple = tuple
- updated_profile = True
- tuple == last_tuple
- if r.v6:
- updated_rules6.append(r)
- continue
- updated_rules.append(r)
-
- if updated_profile:
- self.rules = updated_rules
- self.rules6 = updated_rules6
- rstr += _("Rules updated for profile '%s'") % profile
-
- try:
- self._write_rules(False)
- self._write_rules(True)
- except Exception:
- err_msg = _("Couldn't update application rules")
- raise UFWError(err_msg)
- except:
- None<EXCEPTION MATCH>Exception
-
-
- None<EXCEPTION MATCH>Exception
- return (rstr, updated_profile)
-
-
- def find_application_name(self, str):
- '''Find the application profile name for str'''
- if self.profiles.has_key(str):
- return str
- match = ''
- matches = 0
- for n in self.profiles.keys():
- if n.lower() == str.lower():
- match = n
- matches += 1
- continue
- self.profiles.has_key(str)
-
- debug_msg = "'%d' matches for '%s'" % (matches, str)
- debug(debug_msg)
- if matches == 1:
- return match
- if matches > 1:
- err_msg = _("Found multiple matches for '%s'. Please use exact profile name") % str
-
- err_msg = _("Could not find a profile matching '%s'") % str
- raise UFWError(err_msg)
-
-
- def find_other_position(self, position, v6):
- """Return the absolute position in the other list of the rule with the
- \t user position of the given list. For example, find_other_position(4,
- \t True) will return the absolute position of the rule in the ipv4 list
- matching the user specified '4' rule in the ipv6 list.
- """
- if v6 and position > len(self.rules6):
- raise ValueError()
- position > len(self.rules6)
- if not v6 and position > len(self.rules):
- raise ValueError()
- position > len(self.rules)
- if position < 1:
- raise ValueError()
- position < 1
- rules = []
- if v6:
- rules = self.rules6
- else:
- rules = self.rules
- app_rules = { }
- tuple_offset = 0
- for i, r in enumerate(rules):
- if i >= position:
- break
-
- tuple = ''
- if r.dapp != '' or r.sapp != '':
- tuple = r.get_app_tuple()
- if app_rules.has_key(tuple):
- tuple_offset += 1
- else:
- app_rules[tuple] = True
- app_rules.has_key(tuple)
-
- rules = []
- if v6:
- rules = self.rules
- match_rule = self.rules6[(position - 1) + tuple_offset].dup_rule()
- match_rule.set_v6(False)
- else:
- rules = self.rules6
- match_rule = self.rules[(position - 1) + tuple_offset].dup_rule()
- match_rule.set_v6(True)
- count = 1
- for r in rules:
- if UFWRule.match(r, match_rule) == 0:
- return count
- count += 1
-
- return 0
-
-
- def get_loglevel(self):
- '''Gets current log level of firewall'''
- level = 0
- rstr = _('Logging: ')
- if not self.defaults.has_key('loglevel') or self.defaults['loglevel'] not in self.loglevels.keys():
- level = -1
- rstr += _('unknown')
- else:
- level = self.loglevels[self.defaults['loglevel']]
- if level == 0:
- rstr += 'off'
- else:
- rstr += 'on (%s)' % self.defaults['loglevel']
- return (level, rstr)
-
-
- def set_loglevel(self, level):
- '''Sets log level of firewall'''
- if level not in self.loglevels.keys() + [
- 'on']:
- err_msg = _("Invalid log level '%s'") % level
- raise UFWError(err_msg)
- level not in self.loglevels.keys() + [
- 'on']
- new_level = level
- if level == 'on':
- if not self.defaults.has_key('loglevel') or self.defaults['loglevel'] == 'off':
- new_level = 'low'
- else:
- new_level = self.defaults['loglevel']
-
- self.set_default(self.files['conf'], 'LOGLEVEL', new_level)
-
- try:
- self.update_logging(new_level)
- except:
- raise
-
- if new_level == 'off':
- return _('Logging disabled')
- return _('Logging enabled')
-
-
- def get_rules_count(self, v6):
- '''Return number of ufw rules (not iptables rules)'''
- rules = []
- if v6:
- rules = self.rules6
- else:
- rules = self.rules
- count = 0
- app_rules = { }
- for r in rules:
- tuple = ''
- if r.dapp != '' or r.sapp != '':
- tuple = r.get_app_tuple()
- if app_rules.has_key(tuple):
- debug("Skipping found tuple '%s'" % tuple)
- continue
- else:
- app_rules[tuple] = True
-
- count += 1
-
- return count
-
-
- def get_default_policy(self):
- raise UFWError('UFWBackend.get_default_policy: need to override')
-
-
- def set_default_policy(self, policy):
- raise UFWError('UFWBackend.set_default_policy: need to override')
-
-
- def get_running_raw(self):
- raise UFWError('UFWBackend.get_running_raw: need to override')
-
-
- def get_status(self, verbose, show_count):
- raise UFWError('UFWBackend.get_status: need to override')
-
-
- def get_status_as_list(self):
- raise UFWError('UFWBackend.get_status_as_list: need to override')
-
-
- def set_rule(self, rule, allow_reload):
- raise UFWError('UFWBackend.set_rule: need to override')
-
-
- def start_firewall(self):
- raise UFWError('UFWBackend.start_firewall: need to override')
-
-
- def stop_firewall(self):
- raise UFWError('UFWBackend.stop_firewall: need to override')
-
-
- def get_app_rules_from_system(self, template, v6):
- raise UFWError('UFWBackend.get_app_rules_from_system: need to ' + 'override')
-
-
- def update_logging(self, level):
- raise UFWError('UFWBackend.update_logging: need to override')
-
-
-